If you’ve been following recent news in the Big Data world, you’ve probably heard about Apache Flink. This platform for batch and stream processing, which is built on a few significant technical innovations, can become a real game changer and it is starting to compete with existing products like Apache Spark.
In this post, I would like to show how to implement a simple batch processing algorithm using Apache Flink. We will work with a dataset of movie ratings and will produce a distribution of user ratings. In the process, I’ll show few tricks that you can use to improve the performance of your Flink applications.
Get the data
At first, we need to get the data to work with. Here I will use free data from the GroupLens website that has multiple movie rating datasets for development and research. To make things a bit faster I will run all tests on their small development dataset with 100K ratings, but the code that I present here can be used on datasets of any size.
If you are following along, simply download and unpack the dataset. It contains four CSV files:
- ratings.csv – contains ratings of movies by users on the 1 to 5 scale. It has four fields: user id, movie id, rating, and a timestamp of a rating
- movies.csv – contains information about the movies rated in the ratings.csv file. It has three fields: movie id, movie name, and a list of genres like “Comedy|Drama|Romance”
- links.csv – contains links between movies ids in this dataset and ids on the IMDB and TMDb movie databases
- tags.csv – represents tags assigned by users to movies in this dataset. Contains four fields: user id, movie id, string tag, and a timestamp when tag was added
In this post, we will work only with data in the ratings.csv file, but in the following blog posts I will show other, more sophisticated algorithms that will use other files in this dataset.
If you take a look at the ratings.csv you will find something like this:
userId,movieId,rating,timestamp
1,31,2.5,1260759144
1,1029,3.0,1260759179
...
The first line here means that user with id 1
rated movie 31
with grade 2.5
.
Create the project
Creating an Apache Flink project is pretty straightforward. Apache Flink developers created a project template for us, so all we need to do is to use the Maven archetype:generate
command
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
-DarchetypeVersion=1.1.3
It will generate a pom.xml
file and several example Flink applications. To write our own, we need to create a new Java class with a main
method. It will work in both development mode and on a Flink cluster.
Write the code
As result of our application, we would like to see a distribution of ratings in the movies dataset. In other words, we need to count how many times users have published one-star ratings, how many they’ve published times two-star ratings, and so on.
The first thing that we need to do is to read the file. This can be done with the following:
public class RatingsDistribution {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> file = env.readTextFile("ml-latest-small/ratings.csv");
}
}
In the first line, we create an execution environment that will give us access to Flink features. If we run this application on a development machine, it will locally start a mini Flink cluster for testing. If we execute this application on a Flink cluster, it will use existing cluster resources to perform data processing.
The second line of the main
function will read data from a local filesystem, which is handy for development but Flink also supports reading data from HDFS, S3, and other input sources. To read from a different source, we can use a URL like: “hdfs://path/to/file” or “s3://path/to/file” which instructs Flink to access a distributed filesystem.
At this moment, we have a DataSet
: an object that works as a handle for data in Flink. Every item in this dataset represents a single line from the downloaded CSV file. We can use this DataSet
instance to perform data transformations like map
, filter
, groupBy
, etc. to implement our data processing algorithm. These operations form sort of a pipeline for data processing.
The next thing that we need to do is convert all items in this dataset into Java objects that we can process. To do this, we will use the flatMap
function that converts every input element into zero, one, or multiple output elements.
Flink already has a function to read CSV files, but I just want to show how we can implement it ourselves.
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSource<String> file = env.readTextFile("ml-latest-small/ratings.csv");
DataSet<Tuple2<IntValue, Integer>> ratings = file.flatMap(new ExtractRating());
...
private static class ExtractRating implements FlatMapFunction<String, Tuple2<IntValue, Integer>> {
// Mutable int field to reuse to reduce GC pressure
IntValue ratingValue = new IntValue();
// Reuse rating value and result tuple
Tuple2<IntValue, Integer> result = new Tuple2<>(ratingValue, 1);
@Override
public void flatMap(String s, Collector<Tuple2<IntValue, Integer>> collector) throws Exception {
// Every line contains comma separated values
// user id | item id | rating | timestamp
String[] split = s.split(",");
String ratingStr = split[2];
// Ignore CSV header
if (!ratingStr.equals("rating")) {
int rating = (int) Double.parseDouble(split[2]);
ratingValue.setValue(rating);
collector.collect(result);
}
}
}
There are a few things to go through here. First of all, we need to provide an instance of a FlatMapFunction
that transforms input String
s into tuples. We could convert them into custom POJOs as well, but Flink has some nice utility methods that help to work with Tuple
s more efficiently, so we will stick to them.
To produce a tuple that can be used by the next stage of the pipeline we need to use collector.collect
method. The only case when we do not produce an output value is when we process a header in the CSV file.
Notice that we are reusing the same Tuple2
instance between calls. While we could create a new tuple on every call Flink allows us to reuse return values to reduce GC. To reuse as many objects as possible from call to call we also create an instance of IntValue
class and pass a reference to it to result Tuple. IntValue
is a mutable counterpart of the Integer
class in JDK, and we use it to build a result tuple once, and then we only need to change int value in the existing object.
You can see that on every call we generate a tuple with two elements: rating value and number one. We will use this second element to group by rating value and sum them to get the result distribution.
To do this, we need to call the groupBy
method and specify a key to group elements by. Flink provides multiple ways to specify a key, but since we are using tuples, we can simply specify a position of a tuple field to group by. Since rating values are in the first field, we need pass 0
:
file.flatMap(new ExtractRating())
.groupBy(0);
At this stage, we have a grouped dataset, and we can process elements in every group. To do this we can use the reduceGroup
method:
file.flatMap(new ExtractRating())
.groupBy(0)
.reduceGroup(new SumRatingCount());
...
private static class SumRatingCount implements GroupReduceFunction<Tuple2<IntValue, Integer>, Tuple2<IntValue, Integer>> {
@Override
public void reduce(Iterable<Tuple2<IntValue, Integer>> iterable, Collector<Tuple2<IntValue, Integer>> collector) throws Exception {
IntValue rating = null;
int ratingsCount = 0;
for (Tuple2<IntValue, Integer> tuple : iterable) {
rating = tuple.f0;
ratingsCount += tuple.f1;
}
collector.collect(new Tuple2<>(rating, ratingsCount));
}
}
The code is pretty straightforward. We need to implement an instance of GroupReduceFunction
with a single method reduce
that will be called to process a group of elements with a single key. Flink passes an iterable
to access elements in the group and a collector
(as in the flatMap
function) that should be used to produce the final result.
Our pipeline is finished, but Flink won’t do anything at this stage yet. All the code that we’ve written so far only defines the structure of our application. Now we need to instruct it to start execution. There are multiple ways to do it, but the simplest way for development purposes is to call the print
function. It will execute a data processing algorithm and output the result to the stdout:
file.flatMap(new ExtractRating())
.groupBy(0)
.reduceGroup(new SumRatingCount())
.print();
Flink is not limited to this and can also output data to filesystems and databases.
Now we can simply execute the main
function and inspect the result. It will start a mini cluster and print the result:
14:10:10,299 INFO org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered types and 0 default Kryo serializers
14:10:10,730 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Disabled queryable state server
14:10:10,812 INFO org.apache.flink.runtime.minicluster.FlinkMiniCluster - Starting FlinkMiniCluster.
...
14:10:14,095 INFO org.apache.flink.runtime.taskmanager.TaskManager - Task manager akka://flink/user/taskmanager_1 is completely shut down.
(3,30602)
(0,1101)
(1,5013)
(5,15095)
(2,11720)
(4,36473)
Simplify the code
Summing values in a group is a common operation in batch processing algorithms, and Flink has a shortcut to do this. Instead of using the reduceGroup
function, we can use the sum
function that does exactly this. Since we are using tuples, we need to specify a field number to sum, which is the second field in our case:
file.flatMap(new ExtractRating())
.groupBy(0)
.sum(1)
.print();
You can find the result application in my GitHub repo.
Conclusions
In this blog post, we’ve explored the Flink batch processing API and implemented a simple data processing algorithm. In the next blog posts, I’ll show how to implement more complicated examples that use other batch processing techniques.
More information
If you want to know more about Apache Flink you can take a look at my Pluralsight course where I cover Apache Flink in more details: Understanding Apache Flink
Here is a short preview of this course:
Share this: